Skip to content

Let activities heartbeat during worker shutdown#2903

Open
baekgyu-kim wants to merge 1 commit into
temporalio:masterfrom
baekgyu-kim:2075
Open

Let activities heartbeat during worker shutdown#2903
baekgyu-kim wants to merge 1 commit into
temporalio:masterfrom
baekgyu-kim:2075

Conversation

@baekgyu-kim

Copy link
Copy Markdown
Contributor

What was changed

  • On the worker-shutdown path (heartbeatExecutor already shut down), HeartbeatContextImpl.heartbeat() now emits the heartbeat to the server before throwing ActivityWorkerShutdownException (previously: thrown without sending).
  • Throttled like the normal scheduled path (success → heartbeat interval, transient failure → short retry interval), reusing the last heartbeat's throttle state so a caller looping on heartbeat() cannot flood the server.
  • Send errors are swallowed (WARN-logged); ActivityWorkerShutdownException is always thrown, so a transient failure cannot mask the shutdown signal.
  • ActivityWorkerShutdownException Javadoc updated accordingly.

Why?

  • Before: on shutdown, heartbeat() threw immediately without contacting the server → no heartbeat during the awaitTermination grace period → server times the activity out and retries it → duplicate executions, despite the worker deliberately giving the activity time to finish.
  • After: the activity can keep heartbeating during the grace period (by catching the exception) → each call refreshes the server-side heartbeat deadline → no premature timeout/retry.
  • Design: emit-then-throw is the approach agreed in Add the ability to keep heartbeating while the worker is shutting down #2075; throttling keeps a manual heartbeat loop from flooding the server.

Checklist

  1. Closes Add the ability to keep heartbeating while the worker is shutting down #2075

  2. How was this tested:

    New HeartbeatContextImplTest cases:

    • emit-then-throw on the shutdown path
    • exception still thrown when the send fails (gRPC and non-gRPC)
    • repeated rapid calls throttled to a single send
    • prompt retry after a failed send (short interval, not a full one)
    • resend once the throttle interval elapses
    • throttle carry-over from a preceding normal heartbeat — fails if the throttle-state write is removed
    ./gradlew :temporal-sdk:test --tests "io.temporal.internal.activity.HeartbeatContextImplTest"
    ./gradlew :temporal-sdk:spotlessJavaCheck
    
  3. Any docs updates needed?

    • The behavior is documented in the ActivityWorkerShutdownException Javadoc.

@maciejdudko maciejdudko left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @baekgyu-kim, thank you for your contribution! It's great to see someone taking on these long standing issues. However, this is not the right implementation.

There should be a new worker option to enable heartbeating during shutdown, It should default to disabled, and when disabled, the behavior should be identical to existing behavior for backward compatibility purposes.

When the option is enabled, the heartbeat behavior should be identical to what happens during normal heartbeat when the worker is not shutting down. There should be no additional code path that calls sendHeartbeatRequest a different way, the existing mechanism should be used. The way to achieve that is to modify SyncActivityWorker.shutdown so that heartbeatExecutor.shutdown is only called after all outstanding activity tasks have finished executing.

If you need assistance with implementation, feel free to reach out on community Slack, either message me directly or post on #java-sdk channel.

@baekgyu-kim

Copy link
Copy Markdown
Contributor Author

Hi @maciejdudko,
Thank you for the thoughtful review! I've reworked the PR as suggested.

It now adds an experimental WorkerOptions.Builder#setActivityHeartbeatDuringShutdown option. (default false, preserving the existing behavior)
When enabled, the heartbeat executor is shut down only after all outstanding activity tasks have finished executing, so heartbeats go through the existing mechanism without any separate code path.

Whenever you have a chance, I'd appreciate another look. Thanks again!

private PollerBehavior workflowTaskPollersBehavior;
private PollerBehavior activityTaskPollersBehavior;
private PollerBehavior nexusTaskPollersBehavior;
private boolean activityHeartbeatDuringShutdown;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field should be named allowActivityHeartbeatDuringShutdown, the options getter should be named getAllowActivityHeartbeatDuringShutdown, and the builder setter should be named setAllowActivityHeartbeatDuringShutdown. Apply this change consistently throughout the PR.

return null;
});
CompletableFuture<Void> shutdownFuture;
if (activityHeartbeatDuringShutdown) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When interruptTasks is true (shutdownNow was called instead of shutdown), it should behave as if heartbeat during shutdown was disabled.

Suggested change
if (activityHeartbeatDuringShutdown) {
if (allowActivityHeartbeatDuringShutdown && !interruptTasks) {

Comment on lines +380 to +382
* io.temporal.client.ActivityWorkerShutdownException}, unless {@link
* WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case
* heartbeats keep working until the activity tasks finish executing.<br>

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdownNow behavior stays the same, see comment in SyncActivityWorker.

Suggested change
* io.temporal.client.ActivityWorkerShutdownException}, unless {@link
* WorkerOptions.Builder#setActivityHeartbeatDuringShutdown(boolean)} is enabled, in which case
* heartbeats keep working until the activity tasks finish executing.<br>
* io.temporal.client.ActivityWorkerShutdownException}.<br>

Comment on lines +529 to +547
/**
* If enabled, activities can keep heartbeating while the worker is shutting down. The activity
* heartbeat executor is closed only after all outstanding activity tasks have finished
* executing, so {@link io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} behaves
* exactly as it does while the worker is running: heartbeats are throttled and sent to the
* server, which keeps the server from timing the activity out during the {@link
* WorkerFactory#awaitTermination(long, java.util.concurrent.TimeUnit)} grace period.
*
* <p>Note that with this option enabled activities are no longer notified of the worker
* shutdown by an {@link io.temporal.client.ActivityWorkerShutdownException} thrown from {@code
* heartbeat}, so they are expected to complete within the termination grace period on their
* own.
*
* <p>Defaults to false, meaning that after shutdown is requested, {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} stops sending heartbeats and
* throws {@link io.temporal.client.ActivityWorkerShutdownException}.
*/
@Experimental
public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to document implementation details.

Suggested change
/**
* If enabled, activities can keep heartbeating while the worker is shutting down. The activity
* heartbeat executor is closed only after all outstanding activity tasks have finished
* executing, so {@link io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} behaves
* exactly as it does while the worker is running: heartbeats are throttled and sent to the
* server, which keeps the server from timing the activity out during the {@link
* WorkerFactory#awaitTermination(long, java.util.concurrent.TimeUnit)} grace period.
*
* <p>Note that with this option enabled activities are no longer notified of the worker
* shutdown by an {@link io.temporal.client.ActivityWorkerShutdownException} thrown from {@code
* heartbeat}, so they are expected to complete within the termination grace period on their
* own.
*
* <p>Defaults to false, meaning that after shutdown is requested, {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat(Object)} stops sending heartbeats and
* throws {@link io.temporal.client.ActivityWorkerShutdownException}.
*/
@Experimental
public Builder setActivityHeartbeatDuringShutdown(boolean activityHeartbeatDuringShutdown) {
/**
* If true, activities can keep heartbeating during graceful worker shutdown (see {@link
* io.temporal.worker.WorkerFactory#shutdown WorkerFactory.shutdown}). Defaults to false,
* which means that after graceful shutdown is requested, calling {@link
* io.temporal.activity.ActivityExecutionContext#heartbeat ActivityExecutionContext.heartbeat}
* does not send a heartbeat and instead throws {@link
* io.temporal.client.ActivityWorkerShutdownException ActivityWorkerShutdownException}. This
* option is ignored by non-graceful shutdown (see {@link
* io.temporal.worker.WorkerFactory#shutdownNow WorkerFactory.shutdownNow}).
*
* <p>Note that with this option enabled, activities are no longer notified of the worker
* shutdown by the {@link io.temporal.client.ActivityWorkerShutdownException
* ActivityWorkerShutdownException} exception, so they are expected to complete within the
* termination grace period on their own.
*/
@Experimental
public Builder setAllowActivityHeartbeatDuringShutdown(boolean allowActivityHeartbeatDuringShutdown) {

Comment on lines +57 to +59
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
started.get();
testWorkflowRule.getTestEnvironment().shutdown();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a race condition here - shutdown() call can go through before activity worker receives the task, which will prevent the activity from running and the test will fail.

This feature will be easier to test using a standalone activity. It should work like this:

  1. Test starts activity.
  2. Test blocks on a semaphore 1 until activity starts.
  3. Activity signals semaphore 1.
  4. Activity blocks on semaphore 2 until shutdown is triggered.
  5. Test calls shutdown().
  6. Test signals semaphore 2.
  7. Activity heartbeats then returns. (An exception will fail the activity.)
  8. Test calls result() on activity handle to ensure it succeeded. (Failure will throw exception and fail the test.)

* ActivityWorkerShutdownException}.
*/
@Test
public void testHeartbeatingActivityCompletesDuringShutdown()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a test case for when shutdownNow is called instead of shutdown.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add the ability to keep heartbeating while the worker is shutting down

2 participants